0ae2a3851cbfe1ec2f2c7237954b18c9951c76a3,runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java,FlinkMergingNonShuffleReduceFunction,reduce,#Iterable#Collector#,93
Before Change
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext =
new FlinkSingleOutputProcessContext<>(
serializedOptions.getPipelineOptions(),
getRuntimeContext(),
doFn,
windowingStrategy,
sideInputs, out
);
OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
OldPerKeyCombineFnRunners.create(combineFn);
@SuppressWarnings("unchecked")
OutputTimeFn<? super BoundedWindow> outputTimeFn =
(OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
// get all elements so that we can sort them, has to fit into
// memory
// this seems very unprudent, but correct, for now
List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
for (WindowedValue<KV<K, InputT>> inputValue: elements) {
for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) {
sortedInput.add(exploded);
}
}
Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
@Override
public int compare(
WindowedValue<KV<K, InputT>> o1,
WindowedValue<KV<K, InputT>> o2) {
return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
}
});
// merge windows, we have to do it in an extra pre-processing step and
// can't do it as we go since the window of early elements would not
// be correct when calling the CombineFn
mergeWindow(sortedInput);
// iterate over the elements that are sorted by window timestamp
final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
// create accumulator using the first elements key
WindowedValue<KV<K, InputT>> currentValue = iterator.next();
K key = currentValue.getValue().getKey();
IntervalWindow currentWindow =
(IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
InputT firstValue = currentValue.getValue().getValue();
processContext.setWindowedValue(currentValue);
AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
// we use this to keep track of the timestamps assigned by the OutputTimeFn
Instant windowTimestamp =
outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
while (iterator.hasNext()) {
WindowedValue<KV<K, InputT>> nextValue = iterator.next();
IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
if (currentWindow.equals(nextWindow)) {
// continue accumulating and merge windows
InputT value = nextValue.getValue().getValue();
processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
windowTimestamp = outputTimeFn.combine(
windowTimestamp,
outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
} else {
// emit the value that we currently have
out.collect(
WindowedValue.of(
KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
windowTimestamp,
currentWindow,
PaneInfo.NO_FIRING));
currentWindow = nextWindow;
InputT value = nextValue.getValue().getValue();
processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.createAccumulator(key, processContext);
accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
After Change
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
PipelineOptions options = serializedOptions.getPipelineOptions();
FlinkSideInputReader sideInputReader =
new FlinkSideInputReader(sideInputs, getRuntimeContext());
PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
PerKeyCombineFnRunners.create(combineFn);
@SuppressWarnings("unchecked")
OutputTimeFn<? super BoundedWindow> outputTimeFn =
(OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
// get all elements so that we can sort them, has to fit into
// memory
// this seems very unprudent, but correct, for now
List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
for (WindowedValue<KV<K, InputT>> inputValue : elements) {
for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
sortedInput.add(exploded);
}
}
Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
@Override
public int compare(
WindowedValue<KV<K, InputT>> o1,
WindowedValue<KV<K, InputT>> o2) {
return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
}
});
// merge windows, we have to do it in an extra pre-processing step and
// can't do it as we go since the window of early elements would not
// be correct when calling the CombineFn
mergeWindow(sortedInput);
// iterate over the elements that are sorted by window timestamp
final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
// create accumulator using the first elements key
WindowedValue<KV<K, InputT>> currentValue = iterator.next();
K key = currentValue.getValue().getKey();
IntervalWindow currentWindow =
(IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
InputT firstValue = currentValue.getValue().getValue();
AccumT accumulator =
combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows());
accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
options, sideInputReader, currentValue.getWindows());
// we use this to keep track of the timestamps assigned by the OutputTimeFn
Instant windowTimestamp =
outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
while (iterator.hasNext()) {
WindowedValue<KV<K, InputT>> nextValue = iterator.next();
IntervalWindow nextWindow =
(IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
if (currentWindow.equals(nextWindow)) {
// continue accumulating and merge windows
InputT value = nextValue.getValue().getValue();
accumulator = combineFnRunner.addInput(key, accumulator, value,
options, sideInputReader, currentValue.getWindows());
windowTimestamp = outputTimeFn.combine(
windowTimestamp,
outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
} else {
// emit the value that we currently have
out.collect(
WindowedValue.of(
KV.of(key, combineFnRunner.extractOutput(key, accumulator,
options, sideInputReader, currentValue.getWindows())),
windowTimestamp,
currentWindow,
PaneInfo.NO_FIRING));
currentWindow = nextWindow;
currentValue = nextValue;
InputT value = nextValue.getValue().getValue();
accumulator = combineFnRunner.createAccumulator(key,
options, sideInputReader, currentValue.getWindows());
accumulator = combineFnRunner.addInput(key, accumulator, value,
options, sideInputReader, currentValue.getWindows());
windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
}